package com.jie.flink.cdc.flinksink;

import com.jie.flink.cdc.flinksink.config.ElasticsearchConfigProperties;
import com.jie.flink.cdc.flinksink.config.KafkaConfigProperties;
import com.jie.flink.cdc.flinksink.config.PulsarConfigProperties;
import com.jie.flink.cdc.flinksink.config.SinkLogConfigProperties;
import lombok.Data;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.pulsar.sink.PulsarSink;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.http.HttpHost;
import org.bouncycastle.util.Arrays;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @author zhanggj
 * @date 2023/3/1 15:16
 * @desc
 */
@Data
@Component
//@ConditionalOnExpression("!'${jie.flink-cdc.stream.sink}'.isEmpty()")
public class FlinkSinkInfo {
    private Map<String ,Sink<String>> sinkMap = new HashMap<>();

    @Bean("kafkaSink")
    @ConditionalOnBean(KafkaConfigProperties.class)
    public KafkaSink<String> buildKafkaSink(KafkaConfigProperties kafkaConfigProperties) {
        final KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers(kafkaConfigProperties.getBootstrapServer())
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(kafkaConfigProperties.getTopic())
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
        this.sinkMap.put("kafka", kafkaSink);
        return kafkaSink;
    }

    @Bean("pulsarSink")
    @ConditionalOnBean(PulsarConfigProperties.class)
    public PulsarSink<String> buildPulsarSink(PulsarConfigProperties pulsarConfigProperties) {
        final PulsarSink<String> pulsarSink = PulsarSink.<String>builder()
                .setAdminUrl(pulsarConfigProperties.getAdminUrl())
                .setServiceUrl(pulsarConfigProperties.getServiceUrl())
                .setTopics(pulsarConfigProperties.getTopic())
                .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
                .build();
        this.sinkMap.put("pulsar", pulsarSink);
        return pulsarSink;
    }

    @Bean("esSink")
    @ConditionalOnBean(ElasticsearchConfigProperties.class)
    public ElasticsearchSink<String> buildEsSink(ElasticsearchConfigProperties elasticsearchConfigProperties) {
        if (Arrays.isNullOrEmpty(elasticsearchConfigProperties.getHosts())) {
            return null;
        }
        HttpHost[] httpHosts = new HttpHost[elasticsearchConfigProperties.getHosts().length];
        for (int i = 0; i < elasticsearchConfigProperties.getHosts().length; i++) {
            httpHosts[i] = HttpHost.create(elasticsearchConfigProperties.getHosts()[i]);
        }
        ElasticsearchSink<String> elasticsearchSink = new Elasticsearch7SinkBuilder()
                .setHosts(httpHosts)
                .setConnectionUsername(elasticsearchConfigProperties.getUserName())
                .setConnectionPassword(elasticsearchConfigProperties.getPassword())
                .setEmitter(new ElasticsearchEmitterImpl())
                .build();
        this.sinkMap.put("elasticsearch", elasticsearchSink);
        return elasticsearchSink;
    }

    @Bean
    @ConditionalOnBean(SinkLogConfigProperties.class)
    public FlinkLogSink buildLogSink() {
        FlinkLogSink flinkLogSink = new FlinkLogSink();

        this.sinkMap.put("log-sink", flinkLogSink);
        return flinkLogSink;
    }
}
